package com.data.dev.flink.keleiTopic.main;


import com.data.dev.common.javabean.BaseBean;
import com.data.dev.elasticsearch.ElasticSearchInfo;
import com.data.dev.elasticsearch.SinkToEs;
import com.data.dev.flink.FlinkEnv;
import com.data.dev.key.ConfigurationKey;
import com.data.dev.flink.keleiTopic.operationForPortScan.kelaiMsgFilter;
import com.data.dev.flink.keleiTopic.operationForPortScan.kelaiMsgToStringMapper;
import com.data.dev.flink.keleiTopic.operationForPortScan.keleiJsonArrayToKeleiFlatMapper;
import com.data.dev.flink.keleiTopic.operationForPortScan.keleiMsgJsonArrayToKeleiListMapper;
import com.data.dev.kafka.KafkaSourceBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;


import java.time.Duration;


/**
 * @author wangxiaoming-ghq 2022-05-30
 * 解析kafka kelei topic 进行端口扫描逻辑判断
 */

@Slf4j
public class KelaiMsg extends BaseBean {

    public static final  String INDEX_NAME = ElasticSearchInfo.ES_PORT_SCAN_INDEX_NAME;
    public static final  String INDEX_TYPE = ElasticSearchInfo.ES_INDEX_TYPE_DEFAULT;
    public static final  String JOB_NAME = "告警采集平台——端口扫描告警";

    public KelaiMsg(){}

    public static void execute() {
        //① 创建Flink执行环境并设置checkpoint等必要的参数
        final StreamExecutionEnvironment env = FlinkEnv.getFlinkEnv();

        //② 创建kafka数据源DataStream，消费指定主题的消息
        KafkaSource<String> kafkaSource = KafkaSourceBuilder.getKafkaSource(ConfigurationKey.KAFKA_KELAI_TOPIC_NAME,ConfigurationKey.KAFKA_KELAI_CONSUMER_GROUP_ID) ;

        //③ 针对该主题进行一系列需要进行的规则计算
        SingleOutputStreamOperator<String> KeleiMsgFilteredBlackListDataStream = env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(3)), "PortScan")
                .map(new keleiMsgJsonArrayToKeleiListMapper()).name("映射，接收消息集合")
                .flatMap(new keleiJsonArrayToKeleiFlatMapper()).name("打平集合提取每一条消息")
                .filter(new kelaiMsgFilter()).name("过滤告警并推送企微")
                .map(new kelaiMsgToStringMapper()).name("消息标准化");

        //④ 将计算后的符合需求的告警消息推送到ES
        ElasticsearchSink<String> esSink = SinkToEs.getEsSinkBuilder(INDEX_NAME,INDEX_TYPE).build();
        KeleiMsgFilteredBlackListDataStream.addSink(esSink);

        //⑤最终步骤：提交Flink集群进行执行
        FlinkEnv.envExec(env,JOB_NAME);
    }

}
